参数调优
Map Task和Reduce Task数目调整
- Map Task数目
- Map读取文件时,通过InputFormat计算分割文件
- split大小由以下三个参数决定
- dfs.blocksize HDFS Block大小
- mapreduce.input.fileinputformat.split.minsize 划分最小字节数
- mapreduce.input.fileinputformat.split.maxsize 划分最大字节数
- 计算公式
1 2 3
| protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
|
- Reduce Task数目
- 默认每个作业Reduce Task数目可以通过mapreduce.job.reduces控制
- 在每个作业中也可以通过Job.setNumReduceTasks(Int number)进行控制
容错参数调整

推测执行参数调整

内存参数调整

MapReduce优点和缺点
- MapReduce特点
- 模型简单
- 高伸缩性
- 灵活
- 速度快
- 并行处理
- 容错能力强
- 缺点
- 流式数据-MapReduce处理模型就决定了需要静态数据
- 实时计算-不适合低延迟数据处理,需要毫秒级别响应
- 复杂算法-例如SVM支持向量机
- 迭代计算-例如斐波那契数列
MapReduce编程
环境参考HDFS编程
WordCount案例开发Java版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| package com.kun.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountApp {
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
LongWritable one = new LongWritable(1);
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for(String word : words) { context.write(new Text(word), one); }
} }
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0; for(LongWritable value : values) { sum += value.get(); }
context.write(key, new LongWritable(sum)); } }
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "wordcount");
job.setJarByClass(WordCountApp.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
使用IDEA+Maven开发wc:
1)开发
2)编译:mvn clean package -DskipTests
3)上传到服务器:scp target/hadoop-train-XXX.jar
4)运行
hadoop jar /home/hadoop/lib/hadoop-train-1.0.jar com.kun.hadoop.mapreduce.WordCountApp hdfs://hadoop:9000/hello.txt hdfs://hadoop:9000/output/wc
注意:
- 相同的代码和脚本再次执行,会报错
security.UserGroupInformation:
PriviledgedActionException as:hadoop (auth:SIMPLE) cause:
org.apache.hadoop.mapred.FileAlreadyExistsException:
Output directory hdfs://hadoop:9000/output/wc already exists
Exception in thread “main” org.apache.hadoop.mapred.FileAlreadyExistsException:
Output directory hdfs://hadoop:9000/output/wc already exists
- 在MR中,输出文件是不能事先存在的
1)先手工通过shell的方式将输出文件夹先删除
hadoop fs -rm -r /output/wc
2) 在代码中完成自动删除功能: 推荐大家使用这种方式
在main中的//创建Configuration下面加入1 2 3 4 5 6 7
| Path outputPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if(fileSystem.exists(outputPath)){ fileSystem.delete(outputPath, true); System.out.println("output file exists, but is has deleted"); }
|
Combiner
- 本地的reducer
- 减少Map Tasks输出的数据量及数据网络传输量

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| package com.kun.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CombinerApp {
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
LongWritable one = new LongWritable(1);
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for(String word : words) { context.write(new Text(word), one); }
} }
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0; for(LongWritable value : values) { sum += value.get(); }
context.write(key, new LongWritable(sum)); } }
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Path outputPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if(fileSystem.exists(outputPath)){ fileSystem.delete(outputPath, true); System.out.println("output file exists, but is has deleted"); }
Job job = Job.getInstance(configuration, "wordcount");
job.setJarByClass(CombinerApp.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
job.setCombinerClass(MyReducer.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
在运行过程中可以看出有没有combine

hadoop jar /home/hadoop/lib/hadoop-train-1.0.jar com.kun.hadoop.mapreduce.CombinerApp hdfs://hadoop:9000/hello.txt hdfs://hadoop:9000/output/wc
使用场景:
求和、次数 +
平均数 X
Partitoner

需求:相同类型的手机丢到一个reduce里
xiaomi 200
huawei 300
xiaomi 100
huawei 200
iphone7 300
iphone7 500
nokia 20
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
| package com.kun.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ParititonerApp {
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
context.write(new Text(words[0]), new LongWritable(Long.parseLong(words[1])));
} }
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0; for(LongWritable value : values) { sum += value.get(); }
context.write(key, new LongWritable(sum)); } }
public static class MyPartitioner extends Partitioner<Text, LongWritable> {
@Override public int getPartition(Text key, LongWritable value, int numPartitions) {
if(key.toString().equals("xiaomi")) { return 0; }
if(key.toString().equals("huawei")) { return 1; }
if(key.toString().equals("iphone7")) { return 2; }
return 3; } }
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Path outputPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if(fileSystem.exists(outputPath)){ fileSystem.delete(outputPath, true); System.out.println("output file exists, but is has deleted"); }
Job job = Job.getInstance(configuration, "wordcount");
job.setJarByClass(ParititonerApp.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(4);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
Partitioner
hadoop jar /home/hadoop/lib/hadoop-train-1.0.jar com.kun.hadoop.mapreduce.ParititonerApp hdfs://hadoop:9000/partitioner hdfs://hadoop:9000/output/partitioner
如下图;四个分区结果

使用场景:
数据倾斜情况下可以分区来处理
jobhistory开启
- 记录已运行完的MapReduce信息到指定的HDFS目录下
- 默认是不开启的
修改hadoop_home/etc/hadoop/mapred-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| <property> <name>mapreduce.jobhistory.address</name> <value>hadoop:10020</value> </property>
<property> <name>mapreduce.jobhistory.webapp.address</name> <value>hadoop:19888</value> </property>
<property> <name>mapreduce.jobhistory.done-dir</name> <value>/history/done</value> </property>
<property> <name>mapreduce.jobhistory.intermediate-done-dir</name> <value>/history/done_intermediate</value> </property>
|
重新启动yarn集群
启动jobhistory
mr-jobhistory-daemon.sh start historyserver

此时就可进入JobHistoryServer界面
下图流程解释进入JobHistoryServer界面操作;点击finishd,点击运行完毕的作业
【Yarn界面】

点击History
【Yarn界面】

进入History(JobHistoryServer界面)
【JobHistoryServer界面】

错误解决:
History需要配置,配置后可点击进入JobHistoryServer界面;但是点击JobHistoryServer界面【logs】就会报错见【错误图一】;【错误图一】解决后再次点击yarn界面的【logs】会遇到【错误图二】
【错误图一】

解决上诉JobHistoryServer界面问题后,yarn界面的logs点击后可能会遇到下图错误-错误解决 配置yarn.log.server.url的路径
【错误图二】

修改配置文件修改hadoop_home/etc/hadoop/yarn-site.xml
增加
1 2 3 4 5 6 7 8 9
| <property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property>
<property> <name>yarn.log.server.url</name> <value>http: </property>
|
重新启动JobHistoryServer和yarn集群
stop-yarn.sh
mr-jobhistory-daemon.sh stop historyserver
start-yarn.sh
mr-jobhistory-daemon.sh start historyserver
进入hadoop_home/share/hadoop/mapreduce 运行一个官方demo
再次查看结果logs就没有错误了
1
| [root@hadoop mapreduce]# hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.14.4.jar pi 2 3
|